Skip to content

feat: Added a new trait to expose SchemaProvider#1621

Open
parmesant wants to merge 2 commits intoparseablehq:mainfrom
parmesant:query-updates
Open

feat: Added a new trait to expose SchemaProvider#1621
parmesant wants to merge 2 commits intoparseablehq:mainfrom
parmesant:query-updates

Conversation

@parmesant
Copy link
Copy Markdown
Contributor

@parmesant parmesant commented Apr 15, 2026

Fixes #XXXX.

Description


This PR has:

  • been tested to ensure log ingestion and log query works.
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added documentation for new or modified features or behaviors.

Summary by CodeRabbit

  • New Features

    • Crate root exposes core modules for easier access.
    • Added a streaming export for Arrow Flight consumers.
    • New billing/usage metric and public increment helper for hot-tier file scans.
  • Improvements

    • Introduced a pluggable, overridable schema-provider mechanism for customizable schema construction.
    • Made several schema/partitioning helpers and session-state creation publicly accessible for advanced use cases.

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Apr 15, 2026

Walkthrough

This PR adds crate-root re-exports (arrow, datafusion-proto, catalog/utils aliases), introduces a global overridable ParseableSchemaProvider and SCHEMA_PROVIDER, exposes Query::create_session_state, refactors session/schema registration to use the provider when present, extracts/exports partitioning and time-filter helpers, adds a hottier-per-date Prometheus counter and increment helper, and adds an Arrow Flight streaming helper. Cargo adds datafusion-proto.

Changes

Schema provider, query/session, metrics, and streaming helpers

Layer / File(s) Summary
Global Extension Points / Data Shape
src/query/mod.rs
Adds pub trait ParseableSchemaProvider and pub static SCHEMA_PROVIDER: OnceCell<Box<dyn ParseableSchemaProvider>>. Adds ADDITIONAL_PHYSICAL_OPTIMIZER_RULES global.
Core Query / Session Implementation
src/query/mod.rs
InMemorySessionContext::add_schema and Query::create_session_context now use SCHEMA_PROVIDER when set (call new_provider(...)), otherwise fallback to GlobalSchemaProvider. Query::create_session_state made pub and builder now appends rules from ADDITIONAL_PHYSICAL_OPTIMIZER_RULES before build(). Query::execute reuses a local ctx.
Partitioning & Time-filter Helpers
src/query/stream_schema_provider.rs
Extracted and made public: pub fn partitioned_files(...) (moved partitioning/statistics/metrics out of provider impl); changed visibility of helpers to pub fn reversed_mem_table, pub fn PartialTimeFilter::try_from_expr, pub fn expr_in_boundary, and pub fn extract_timestamp_bound. Added call to increment_files_scanned_in_hottier_by_date(...) in hot-tier path.
Metrics
src/metrics/mod.rs
Adds pub static TOTAL_FILES_SCANNED_IN_HOTTIER_BY_DATE: Lazy<IntCounterVec> and pub fn increment_files_scanned_in_hottier_by_date(...).
Arrow Flight Streaming Helper
src/utils/arrow/flight.rs
Adds pub fn into_flight_data_stream(...) converting a SendableRecordBatchStream into an Arrow Flight DoGet stream with IPC compression and error mapping.
Crate Exports & Dependency
src/lib.rs, Cargo.toml
Crate root re-exports: arrow_array, arrow_flight, arrow_ipc, catalog as parseable_catalog, datafusion_proto, utils as parseable_utils. Cargo.toml adds dependency datafusion-proto = "53.1.0".
Call Sites / Wiring
src/query/mod.rs, src/query/...
Updated schema registration call sites to pass schema_provider.into() (uses provider into SchemaProvider).

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant Query
    participant SCHEMA_PROVIDER
    participant ObjectStorage
    participant SessionState
    Client->>Query: create_session_context(tenant_id, storage?)
    alt SCHEMA_PROVIDER set
        Query->>SCHEMA_PROVIDER: new_provider(storage?, tenant_id)
        SCHEMA_PROVIDER->>Query: Box<dyn SchemaProvider>
    else fallback
        Query->>Query: instantiate GlobalSchemaProvider
        Query-->>Query: SchemaProvider
    end
    Query->>SessionState: register_schema(schema_provider.into())
    Query->>SessionState: build (append ADDITIONAL_PHYSICAL_OPTIMIZER_RULES)
    Client->>Query: execute(query)
    Query->>SessionState: create_physical_plan(...)
    Query->>ObjectStorage: read manifests (hot-tier)
    ObjectStorage-->>Query: manifest files
    Query->>metrics: increment_files_scanned_in_hottier_by_date(...)
    Query->>utils/arrow: into_flight_data_stream(stream)
    utils/arrow-->>Client: Flight DoGet stream
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~40 minutes

Possibly related PRs

Suggested labels

for next release

Suggested reviewers

  • nikhilsinhaparseable
  • nitisht

Poem

🐰 I hopped through schemas, wide and deep,
A provider cell for tenants to keep.
Counters ticking for files we find,
Flight streams hum—IPC compressed and kind.
Hop, parse, reply — a tidy little leap!

🚥 Pre-merge checks | ✅ 3 | ❌ 2

❌ Failed checks (2 warnings)

Check name Status Explanation Resolution
Description check ⚠️ Warning The PR description uses the template structure but contains no actual content; all sections are empty placeholders with no explanation of the goal, rationale, changes, or why modifications were made. Fill in the Description section with the goal, chosen solution, and key changes made across lib.rs, query/mod.rs, stream_schema_provider.rs, metrics, and flight utilities.
Docstring Coverage ⚠️ Warning Docstring coverage is 52.38% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (3 passed)
Check name Status Explanation
Title check ✅ Passed The title partially aligns with changes but is incomplete; it mentions only the SchemaProvider trait while omitting major additions like re-exports, metrics, flight stream conversion, and public API expansions across five files.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Tip

💬 Introducing Slack Agent: The best way for teams to turn conversations into code.

Slack Agent is built on CodeRabbit's deep understanding of your code, so your team can collaborate across the entire SDLC without losing context.

  • Generate code and open pull requests
  • Plan features and break down work
  • Investigate incidents and troubleshoot customer tickets together
  • Automate recurring tasks and respond to alerts with triggers
  • Summarize progress and report instantly

Built for teams:

  • Shared memory across your entire org—no repeating context
  • Per-thread sandboxes to safely plan and execute work
  • Governance built-in—scoped access, auditability, and budget controls

One agent for your entire SDLC. Right inside Slack.

👉 Get started


Review rate limit: 4/5 reviews remaining, refill in 12 minutes.

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/query/mod.rs (1)

77-100: ⚠️ Potential issue | 🟠 Major

Prevent late SCHEMA_PROVIDER registration from silently no-oping.

SCHEMA_PROVIDER is only consulted when schemas are registered, but QUERY_SESSION is a process-wide Lazy. If the cell is set after the first QUERY_SESSION access, the default GlobalSchemaProvider remains registered for the lifetime of that session, so the new extension point never takes effect for existing schemas. Please either enforce provider initialization before any session access or rebuild the session context when the provider is installed.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/query/mod.rs` around lines 77 - 100, SCHEMA_PROVIDER can be registered
too late and never affect the process-wide QUERY_SESSION (and
QUERY_SESSION_STATE) because QUERY_SESSION is a Lazy created with the old
provider; fix by ensuring provider initialization happens before any session
access or by rebuilding the session when a provider is installed: update the
code that sets SCHEMA_PROVIDER to, after successful OnceCell::set, call
Query::create_session_context(PARSEABLE.storage()) and replace the stored
session/context (QUERY_SESSION or its InMemorySessionContext.session_context)
and likewise refresh QUERY_SESSION_STATE via Query::create_session_state(...) so
the new ParseableSchemaProvider takes effect for existing sessions.
🧹 Nitpick comments (2)
src/lib.rs (1)

59-64: Clarify stability expectations for these new crate-root re-exports.

At Line 59, Line 60, Line 61, Line 62, Line 64, and Line 73, these pub use additions expand the public API surface. Please document whether they are part of a stable contract (or move them under a dedicated namespace) to avoid accidental long-term semver lock-in.

Also applies to: 73-73

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/lib.rs` around lines 59 - 64, These new crate-root re-exports
(arrow_array, arrow_flight, arrow_ipc, catalog as parseable_catalog, datafusion,
datafusion_proto) expand the public API surface; either mark them explicitly as
unstable/internal or move them under a dedicated namespace/module (e.g.,
reexports::arrow::*) and add a clear doc-comment on each symbol indicating
stability guarantees (stable API vs internal/experimental) so consumers won’t be
accidentally semver-locked; update the lib.rs entries for the listed pub use
items to point to the new module or add #[doc = "... stability: ..."] comments
and/or cfg(feature = "internal-reexports") gating as appropriate.
src/query/stream_schema_provider.rs (1)

701-701: Document the new public helper.

try_from_expr is now part of the public surface. Please add rustdoc describing the accepted expression shapes and the time_partition == None behavior so custom schema providers can depend on it safely.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/query/stream_schema_provider.rs` at line 701, Add Rustdoc for the newly
public helper try_from_expr in stream_schema_provider.rs: describe the accepted
expression shapes (e.g., exact/match on column names, literal values, supported
Expr variants) and any constraints the function assumes from Expr, and
explicitly document the behavior when time_partition is None (what is
returned/assumed and how partition-sensitive logic behaves). Reference the
function name try_from_expr and the Expr type in the doc so external/custom
schema providers know how to call it and what results or None means; keep the
doc concise and include examples of expected expression shapes in prose.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/query/mod.rs`:
- Around line 211-239: The schema provider registration is using
PARSEABLE.storage().get_object_store() instead of the storage passed into
create_session_context(storage), so schemas get registered against the wrong
backend; update both branches where SCHEMA_PROVIDER.new_provider and
GlobalSchemaProvider are constructed to use the caller's storage (the function
parameter named storage) by passing Some(storage.get_object_store()) or
storage.get_object_store() and tenant_id as before, and then call
catalog.register_schema as-is so the catalog is registered against the provided
storage rather than PARSEABLE.storage().

---

Outside diff comments:
In `@src/query/mod.rs`:
- Around line 77-100: SCHEMA_PROVIDER can be registered too late and never
affect the process-wide QUERY_SESSION (and QUERY_SESSION_STATE) because
QUERY_SESSION is a Lazy created with the old provider; fix by ensuring provider
initialization happens before any session access or by rebuilding the session
when a provider is installed: update the code that sets SCHEMA_PROVIDER to,
after successful OnceCell::set, call
Query::create_session_context(PARSEABLE.storage()) and replace the stored
session/context (QUERY_SESSION or its InMemorySessionContext.session_context)
and likewise refresh QUERY_SESSION_STATE via Query::create_session_state(...) so
the new ParseableSchemaProvider takes effect for existing sessions.

---

Nitpick comments:
In `@src/lib.rs`:
- Around line 59-64: These new crate-root re-exports (arrow_array, arrow_flight,
arrow_ipc, catalog as parseable_catalog, datafusion, datafusion_proto) expand
the public API surface; either mark them explicitly as unstable/internal or move
them under a dedicated namespace/module (e.g., reexports::arrow::*) and add a
clear doc-comment on each symbol indicating stability guarantees (stable API vs
internal/experimental) so consumers won’t be accidentally semver-locked; update
the lib.rs entries for the listed pub use items to point to the new module or
add #[doc = "... stability: ..."] comments and/or cfg(feature =
"internal-reexports") gating as appropriate.

In `@src/query/stream_schema_provider.rs`:
- Line 701: Add Rustdoc for the newly public helper try_from_expr in
stream_schema_provider.rs: describe the accepted expression shapes (e.g.,
exact/match on column names, literal values, supported Expr variants) and any
constraints the function assumes from Expr, and explicitly document the behavior
when time_partition is None (what is returned/assumed and how
partition-sensitive logic behaves). Reference the function name try_from_expr
and the Expr type in the doc so external/custom schema providers know how to
call it and what results or None means; keep the doc concise and include
examples of expected expression shapes in prose.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: 2b7d7002-715b-4e6c-a3c9-30b2bb3d8300

📥 Commits

Reviewing files that changed from the base of the PR and between eacb1b9 and 2aa314b.

📒 Files selected for processing (4)
  • Cargo.toml
  • src/lib.rs
  • src/query/mod.rs
  • src/query/stream_schema_provider.rs

Comment thread src/query/mod.rs
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/query/mod.rs (1)

319-338: ⚠️ Potential issue | 🔴 Critical | ⚡ Quick win

Undefined variable ctx will cause a compilation error.

The variable ctx is referenced on lines 319, 322, and 338 but is never defined. The execute method calls QUERY_SESSION.get_ctx() inline on line 302 but does not assign it to a local variable. This code will not compile.

🐛 Proposed fix

Add a local ctx binding before usage:

         if fields.is_empty() && !is_streaming {
             return Ok((Either::Left(vec![]), fields));
         }

+        let ctx = QUERY_SESSION.get_ctx();
         let plan = ctx.state().create_physical_plan(df.logical_plan()).await?;

         let results = if !is_streaming {
             let task_ctx = ctx.task_ctx();
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/query/mod.rs` around lines 319 - 338, The error is that `ctx` is used but
not defined: before calling ctx.state(), ctx.task_ctx(), etc., assign the
session context returned from QUERY_SESSION.get_ctx() to a local variable (e.g.,
let ctx = QUERY_SESSION.get_ctx()?) in the execute function so subsequent calls
like ctx.state().create_physical_plan(...), collect_partitioned(plan.clone(),
ctx.task_ctx().clone()), get_total_bytes_scanned(&plan), and
increment_bytes_scanned_in_query_by_date(..., tenant) compile; locate the call
site where QUERY_SESSION.get_ctx() is currently invoked inline (in execute) and
replace it with a local binding named `ctx` used by create_physical_plan,
collect_partitioned, get_total_bytes_scanned, and
increment_bytes_scanned_in_query_by_date.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Outside diff comments:
In `@src/query/mod.rs`:
- Around line 319-338: The error is that `ctx` is used but not defined: before
calling ctx.state(), ctx.task_ctx(), etc., assign the session context returned
from QUERY_SESSION.get_ctx() to a local variable (e.g., let ctx =
QUERY_SESSION.get_ctx()?) in the execute function so subsequent calls like
ctx.state().create_physical_plan(...), collect_partitioned(plan.clone(),
ctx.task_ctx().clone()), get_total_bytes_scanned(&plan), and
increment_bytes_scanned_in_query_by_date(..., tenant) compile; locate the call
site where QUERY_SESSION.get_ctx() is currently invoked inline (in execute) and
replace it with a local binding named `ctx` used by create_physical_plan,
collect_partitioned, get_total_bytes_scanned, and
increment_bytes_scanned_in_query_by_date.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: 6cbe7d76-b2bf-4e69-a043-307af0a8fcc7

📥 Commits

Reviewing files that changed from the base of the PR and between 2aa314b and 4e50143.

📒 Files selected for processing (3)
  • src/lib.rs
  • src/query/mod.rs
  • src/query/stream_schema_provider.rs
✅ Files skipped from review due to trivial changes (2)
  • src/query/stream_schema_provider.rs
  • src/lib.rs

coderabbitai[bot]
coderabbitai Bot previously approved these changes May 4, 2026
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (1)
src/query/mod.rs (1)

209-229: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Use the caller's storage when constructing schema providers.

create_session_context(storage) still registers schemas against PARSEABLE.storage().get_object_store() in both branches. That gives callers a session state for one backend and schema providers for another whenever they pass a non-default ObjectStorageProvider.

Suggested fix
 pub fn create_session_context(storage: Arc<dyn ObjectStorageProvider>) -> SessionContext {
     let state = Self::create_session_state(storage.clone());
+    let object_store = storage.get_object_store();

     let catalog = state
         .catalog_list()
         .catalog(&state.config_options().catalog.default_catalog)
         .expect("default catalog is provided by datafusion");
@@
                 for t in tenants.iter() {
                     let schema_provider = if let Some(provider) = SCHEMA_PROVIDER.get() {
                         provider.new_provider(
-                            Some(PARSEABLE.storage().get_object_store()),
+                            Some(object_store.clone()),
                             &Some(t.to_owned()),
                         )
                     } else {
                         Box::new(GlobalSchemaProvider {
-                            storage: PARSEABLE.storage().get_object_store(),
+                            storage: object_store.clone(),
                             tenant_id: Some(t.to_owned()),
                         })
                     };
                     let _ = catalog.register_schema(t, schema_provider.into());
                 }
@@
             let schema_provider = if let Some(provider) = SCHEMA_PROVIDER.get() {
-                provider.new_provider(Some(PARSEABLE.storage().get_object_store()), &None)
+                provider.new_provider(Some(object_store.clone()), &None)
             } else {
                 Box::new(GlobalSchemaProvider {
-                    storage: PARSEABLE.storage().get_object_store(),
+                    storage: object_store,
                     tenant_id: None,
                 })
             };
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/query/mod.rs` around lines 209 - 229, The session creation currently
always uses PARSEABLE.storage().get_object_store() when constructing schema
providers, causing providers to be bound to the default backend instead of the
caller-provided one; update create_session_context(storage) so both branches use
the passed-in storage (the function parameter named storage) when calling
SCHEMA_PROVIDER.get().new_provider(...) and when constructing
GlobalSchemaProvider (replace uses of PARSEABLE.storage().get_object_store()
with storage.get_object_store() or the appropriate accessor), ensuring
catalog.register_schema still receives the schema_provider created from the
caller's storage.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/metrics/mod.rs`:
- Around line 260-270: The new IntCounterVec
TOTAL_FILES_SCANNED_IN_HOTTIER_BY_DATE is never registered with the custom
prometheus registry, so it won't be exposed; update the custom_metrics function
to call
METRICS_REGISTRY.register(Box::new(TOTAL_FILES_SCANNED_IN_HOTTIER_BY_DATE.clone()))
(or the equivalent register method used in this module) alongside the other
metrics registration, handling any register errors consistently with existing
patterns so the counter is available on /metrics.

In `@src/utils/arrow/flight.rs`:
- Around line 161-163: Replace the unbounded Flight frame size usage by defining
a single explicit constant (e.g. MAX_FLIGHT_FRAME_SIZE = 16 * 1024 * 1024) and
use it wherever FlightDataEncoderBuilder::with_max_flight_data_size currently
receives usize::MAX; update both into_flight_data_stream and into_flight_data to
call .with_max_flight_data_size(MAX_FLIGHT_FRAME_SIZE) so the Arrow IPC encoder
chunks data consistently with gRPC's frame size instead of allowing unlimited
accumulation.

---

Duplicate comments:
In `@src/query/mod.rs`:
- Around line 209-229: The session creation currently always uses
PARSEABLE.storage().get_object_store() when constructing schema providers,
causing providers to be bound to the default backend instead of the
caller-provided one; update create_session_context(storage) so both branches use
the passed-in storage (the function parameter named storage) when calling
SCHEMA_PROVIDER.get().new_provider(...) and when constructing
GlobalSchemaProvider (replace uses of PARSEABLE.storage().get_object_store()
with storage.get_object_store() or the appropriate accessor), ensuring
catalog.register_schema still receives the schema_provider created from the
caller's storage.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: 435a0108-16bb-4acd-b14f-90dcc5110196

📥 Commits

Reviewing files that changed from the base of the PR and between 4e50143 and 2a23ced.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (5)
  • Cargo.toml
  • src/metrics/mod.rs
  • src/query/mod.rs
  • src/query/stream_schema_provider.rs
  • src/utils/arrow/flight.rs
🚧 Files skipped from review as they are similar to previous changes (1)
  • src/query/stream_schema_provider.rs

Comment thread src/metrics/mod.rs
Comment on lines +260 to +270
pub static TOTAL_FILES_SCANNED_IN_HOTTIER_BY_DATE: Lazy<IntCounterVec> = Lazy::new(|| {
IntCounterVec::new(
Opts::new(
"total_files_scanned_in_hottier_by_date",
"Total files scanned in hottier by date",
)
.namespace(METRICS_NAMESPACE),
&["stream", "date", "tenant_id"],
)
.expect("metric can be created")
});
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail

FILE="src/metrics/mod.rs"

echo "Occurrences of TOTAL_FILES_SCANNED_IN_HOTTIER_BY_DATE:"
rg -nC2 'TOTAL_FILES_SCANNED_IN_HOTTIER_BY_DATE' "$FILE"

echo
echo "Registration block in custom_metrics:"
sed -n '512,580p' "$FILE"

Repository: parseablehq/parseable

Length of output: 3289


🏁 Script executed:

#!/bin/bash
set -euo pipefail

FILE="src/metrics/mod.rs"

# Find the custom_metrics function definition and check its bounds
echo "=== custom_metrics function definition and scope ==="
sed -n '463,470p' "$FILE"

# Get the complete custom_metrics function (find where it ends)
echo
echo "=== End of custom_metrics function ==="
sed -n '575,585p' "$FILE"

# Search for any other registration attempts with the new metric
echo
echo "=== All occurrences of register and TOTAL_FILES_SCANNED_IN_HOTTIER_BY_DATE in same context ==="
rg -n 'register.*TOTAL_FILES_SCANNED_IN_HOTTIER_BY_DATE|TOTAL_FILES_SCANNED_IN_HOTTIER_BY_DATE.*register' "$FILE"

# Confirm the metric is not already registered elsewhere
echo
echo "=== Checking if the metric appears in any register calls ==="
rg -n '\.register\(' "$FILE" | grep -i 'hottier' || echo "No register calls found for hottier metric"

Repository: parseablehq/parseable

Length of output: 920


Register this counter in custom_metrics.

TOTAL_FILES_SCANNED_IN_HOTTIER_BY_DATE is declared at line 260 and incremented in the helper at lines 704-706, but it is never added to METRICS_REGISTRY inside custom_metrics. With the custom registry setup in this file, the metric will not be exposed on /metrics.

Proposed fix
     registry
         .register(Box::new(TOTAL_QUERY_CALLS_BY_DATE.clone()))
         .expect("metric can be registered");
+    registry
+        .register(Box::new(TOTAL_FILES_SCANNED_IN_HOTTIER_BY_DATE.clone()))
+        .expect("metric can be registered");
     registry
         .register(Box::new(TOTAL_FILES_SCANNED_IN_QUERY_BY_DATE.clone()))
         .expect("metric can be registered");
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/metrics/mod.rs` around lines 260 - 270, The new IntCounterVec
TOTAL_FILES_SCANNED_IN_HOTTIER_BY_DATE is never registered with the custom
prometheus registry, so it won't be exposed; update the custom_metrics function
to call
METRICS_REGISTRY.register(Box::new(TOTAL_FILES_SCANNED_IN_HOTTIER_BY_DATE.clone()))
(or the equivalent register method used in this module) alongside the other
metrics registration, handling any register errors consistently with existing
patterns so the counter is available on /metrics.

Comment thread src/utils/arrow/flight.rs
Comment on lines +161 to +163
let flight_data_stream = FlightDataEncoderBuilder::new()
.with_max_flight_data_size(usize::MAX)
.with_options(write_options)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Verify all Arrow Flight encoder call sites still use unbounded size.
rg -nP --type=rust 'with_max_flight_data_size\(\s*usize::MAX\s*\)'

Repository: parseablehq/parseable

Length of output: 219


🏁 Script executed:

#!/bin/bash
# Read the full context of both functions to understand their purpose and design
sed -n '146,190p' src/utils/arrow/flight.rs | cat -n

Repository: parseablehq/parseable

Length of output: 2116


🏁 Script executed:

#!/bin/bash
# Search for any configuration or constants related to Flight frame sizing
rg -i 'max.*flight.*size|flight.*chunk|frame.*size' --type=rust src/

Repository: parseablehq/parseable

Length of output: 365


🏁 Script executed:

#!/bin/bash
# Look for any comments or documentation about the encoding choice
sed -n '140,195p' src/utils/arrow/flight.rs

Repository: parseablehq/parseable

Length of output: 1956


Bound Flight frame size instead of usize::MAX.

Both into_flight_data_stream and into_flight_data use unbounded with_max_flight_data_size(usize::MAX), which allows the Arrow IPC encoder to accumulate data excessively before chunking. Although gRPC's max_frame_size is set to 16 MB in src/handlers/airplane.rs, this creates an implicit and inefficient constraint. Define an explicit constant and use it consistently across both functions instead.

Proposed fix
+const MAX_FLIGHT_DATA_SIZE: usize = 16 * 1024 * 1024;
+
 pub fn into_flight_data_stream(
     stream: datafusion::execution::SendableRecordBatchStream,
 ) -> Result<Response<DoGetStream>, Box<Status>> {
     let record_stream = stream.map_err(|e| {
         arrow_flight::error::FlightError::Arrow(arrow_schema::ArrowError::ExternalError(
             Box::new(e),
         ))
     });

     let write_options = IpcWriteOptions::default()
         .try_with_compression(Some(arrow_ipc::CompressionType(1)))
         .map_err(|err| Status::failed_precondition(err.to_string()))?;

     let flight_data_stream = FlightDataEncoderBuilder::new()
-        .with_max_flight_data_size(usize::MAX)
+        .with_max_flight_data_size(MAX_FLIGHT_DATA_SIZE)
         .with_options(write_options)
         .build(record_stream);

     let flight_data_stream = flight_data_stream.map_err(|err| Status::unknown(err.to_string()));

     Ok(Response::new(Box::pin(flight_data_stream) as DoGetStream))
 }

 pub fn into_flight_data(records: Vec<RecordBatch>) -> Result<Response<DoGetStream>, Box<Status>> {
     let input_stream = futures::stream::iter(records.into_iter().map(Ok));
     let write_options = IpcWriteOptions::default()
         .try_with_compression(Some(arrow_ipc::CompressionType(1)))
         .map_err(|err| Status::failed_precondition(err.to_string()))?;

     let flight_data_stream = FlightDataEncoderBuilder::new()
-        .with_max_flight_data_size(usize::MAX)
+        .with_max_flight_data_size(MAX_FLIGHT_DATA_SIZE)
         .with_options(write_options)
         // .with_schema(schema.into())
         .build(input_stream);

     let flight_data_stream = flight_data_stream.map_err(|err| Status::unknown(err.to_string()));

     Ok(Response::new(Box::pin(flight_data_stream) as DoGetStream))
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let flight_data_stream = FlightDataEncoderBuilder::new()
.with_max_flight_data_size(usize::MAX)
.with_options(write_options)
const MAX_FLIGHT_DATA_SIZE: usize = 16 * 1024 * 1024;
pub fn into_flight_data_stream(
stream: datafusion::execution::SendableRecordBatchStream,
) -> Result<Response<DoGetStream>, Box<Status>> {
let record_stream = stream.map_err(|e| {
arrow_flight::error::FlightError::Arrow(arrow_schema::ArrowError::ExternalError(
Box::new(e),
))
});
let write_options = IpcWriteOptions::default()
.try_with_compression(Some(arrow_ipc::CompressionType(1)))
.map_err(|err| Status::failed_precondition(err.to_string()))?;
let flight_data_stream = FlightDataEncoderBuilder::new()
.with_max_flight_data_size(MAX_FLIGHT_DATA_SIZE)
.with_options(write_options)
.build(record_stream);
let flight_data_stream = flight_data_stream.map_err(|err| Status::unknown(err.to_string()));
Ok(Response::new(Box::pin(flight_data_stream) as DoGetStream))
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/utils/arrow/flight.rs` around lines 161 - 163, Replace the unbounded
Flight frame size usage by defining a single explicit constant (e.g.
MAX_FLIGHT_FRAME_SIZE = 16 * 1024 * 1024) and use it wherever
FlightDataEncoderBuilder::with_max_flight_data_size currently receives
usize::MAX; update both into_flight_data_stream and into_flight_data to call
.with_max_flight_data_size(MAX_FLIGHT_FRAME_SIZE) so the Arrow IPC encoder
chunks data consistently with gRPC's frame size instead of allowing unlimited
accumulation.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant